-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6145: Pt 1. Bump protocol version and encode task lag map #8121
Conversation
Call for review @cadonna @vvcephei cc/ @guozhangwang |
topics, | ||
standbyTasks, | ||
rebalanceProtocol); | ||
// 2. Map from task id to its overall lag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This plus the tech debt cleanup allows for the subscription handling to be greatly simplified, here and below in #assign
0e4db53
to
b51b63b
Compare
/** | ||
* Returns ids of tasks whose states are kept on the local storage. This includes active, standby, and previously | ||
* assigned but not yet cleaned up tasks | ||
*/ | ||
public Set<TaskId> tasksOnLocalStorage() { | ||
Set<TaskId> tasksOnLocalStorage() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was only ever used to encode the subscription info, which is now all handled by getTaskLags
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method could actually become private, except for a single test. I'm wondering if we can port that test to use getTaskLags instead. Aside from letting us make this private, that would probably improve our testing coverage, since I suppose that test was intended to unit test this class, meaning it should be testing this class's public API, not internal methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is getting refactored somewhat heavily in the next PR, where we actually collect the offsets and will lay in some heavy testing. Still trying to strike a balance between not writing/fixing up any tests until the end, and spending time on 50 tests per PR that are all rendered useless as soon as I start the next one.
But, I will tighten up this test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ableegoldman Thank you for the PR!
Here my feedback:
@@ -65,6 +70,27 @@ | |||
"type": "int32" | |||
} | |||
] | |||
}, | |||
{ | |||
"name": "TaskLagPair", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
req: I understand why you called this a pair. However, it seems odd that this pair consists of three fields. Could you call it TaskLagTriple
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, will fix this if it turns out nested structs aren't possible
streams/src/main/resources/common/message/SubscriptionInfo.json
Outdated
Show resolved
Hide resolved
{ | ||
"name": "topicGroupId", | ||
"versions": "1+", | ||
"type": "int32" | ||
}, | ||
{ | ||
"name": "partition", | ||
"versions": "1+", | ||
"type": "int32" | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Is it not possible to use the struct TaskId
here? If not, the versions
field of TaskId
(and of all nested fields?) should be set to 1-6
, shouldn't they? TaskId
is only used for the fields that are removed in version 7.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Working on finding someone who knows the code generation code to clear things up here (whether we can have nested structs, and what is the correct version for structs/inner fields)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright so the good/bad news is that this only works if the nested struct is an array (ie can do type: []TaskId
but not type: TaskId
). There's no reason for this to be the case, besides "didn't have the time and need to implement it" so we could in theory just add this ability.
I haven't looked into the code really so I'm not sure how much time that might take, so, I'm deciding to just encode as basic types for now. It's only two fields after all
.../src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
Outdated
Show resolved
Hide resolved
...ams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
Show resolved
Hide resolved
.../test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
Outdated
Show resolved
Hide resolved
.../test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
Outdated
Show resolved
Hide resolved
.../test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
Show resolved
Hide resolved
5b5af30
to
eae9f3b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @ableegoldman , I did a partial pass. I'll have to review the rest later.
if (!state.ownedPartitions().isEmpty()) { | ||
// this is an optimization: we can't decode the future subscription info's prev tasks, but we can figure | ||
// them out from the encoded ownedPartitions | ||
if (uuid == futureId && !state.ownedPartitions().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be clear, does this mean we're certain that for non-future members (current or older-versioned ones), the encoded "prevTasks" actually contains all the previous tasks?
I gather this is true from the SubscriptionInfo protcol:
{
"name": "prevTasks",
"versions": "1-6",
"type": "[]TaskId"
}
But then, I'm a little mystified by the prior comment... why would "active tasks" not have been encoded with the cooperative protocol?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me give a quick history lesson to clarify:
By adding the ownedPartitions
field to the subscription we were able to remove the encoded prevTasks and avoid duplicating info we could get from the new field for COOPERATIVE members. Note the ownedPartitions
are the source of truth and may differ from the prevTasks that would have been encoded in edge cases (eg topic deletion, partitions lost)
In hindsight, aka a few weeks back when I started looking at the assignor code again, I realized this was just unnecessary and likely to lead to more trouble than it solves. So, now we just encode all tasks in the offset map regardless of rebalance protocol.
Ok, while explaining that I now realize we obviously still need to fill in the prevTasks
from the ownedPartitions
for members on 2.5/2.6 -- thanks Socrates :P
if (isActive(id)) { | ||
taskLags.put(id, ACTIVE_TASK_SENTINEL_LAG); | ||
} else { | ||
taskLags.put(id, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskLags.put(id, 0); | |
taskLags.put(id, STANDBY_TASK_SENTINEL_LAG); |
Either that, or my preference would actually be to inline both sentinel lags (with a comment explaining why that choice of sentinels).
@@ -103,7 +105,8 @@ InternalTopologyBuilder builder() { | |||
return builder; | |||
} | |||
|
|||
void handleRebalanceStart(final Set<String> subscribedTopics) { | |||
// visible for testing | |||
public void handleRebalanceStart(final Set<String> subscribedTopics) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, this also bugs me :)
What if we move org.apache.kafka.streams.tests.StreamsUpgradeTest.FutureStreamsPartitionAssignor
into package org.apache.kafka.streams.processor.internals
? Then package-private would continue to work fine.
My personal bias is that anytime you see // visible for testing
, you're looking at a potential bug, because nothing prevents that comment from becoming false, and in fact, I have found such comments in our code base that were already false. Either this method is part of the public contract of the class, or it's not.
That said, if you really prefer it this way, we can keep it (although, I might ask you to review a clean-up PR later ;) )
/** | ||
* Returns ids of tasks whose states are kept on the local storage. This includes active, standby, and previously | ||
* assigned but not yet cleaned up tasks | ||
*/ | ||
public Set<TaskId> tasksOnLocalStorage() { | ||
Set<TaskId> tasksOnLocalStorage() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method could actually become private, except for a single test. I'm wondering if we can port that test to use getTaskLags instead. Aside from letting us make this private, that would probably improve our testing coverage, since I suppose that test was intended to unit test this class, meaning it should be testing this class's public API, not internal methods.
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Outdated
Show resolved
Hide resolved
final Set<TaskId> standbyTasks = new HashSet<>(); | ||
|
||
for (final Map.Entry<TaskId, Integer> taskOffsetSum : taskOffsetSums.entrySet()) { | ||
if (taskOffsetSum.getValue() == -1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, actually, here's the reason a constant sentinel is nice, but we didn't actually use it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦♀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, @ableegoldman ! I've completed my pass and left a few more comments.
}).collect(Collectors.toList())); | ||
} | ||
|
||
private static void setPrevAndStandbySetsFromParsedTaskOffsetSumMap(final SubscriptionInfoData data, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we invoke this method from a number of places, should we add a flag and make sure it only sets the state once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized the other callers actually don't need to call this at all, so now these only get called from the constructor
|
||
public LegacySubscriptionInfoSerde(final int version, | ||
final int latestSupportedVersion, | ||
final UUID processId, | ||
final Set<TaskId> prevTasks, | ||
final Set<TaskId> standbyTasks, | ||
final String userEndPoint) { | ||
final String userEndPoint, | ||
final Map<TaskId, Integer> taskLags) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm probably missing the point here, but I think the idea of this class is that it should not change in response to changes in SubscriptionInfo. I think it's supposed to be a stand-in for the behavior of older Streams versions when the cluster has old and new members running at the same time. Maybe it doesn't really work that way, though, in which case, I might doubt the utility of this class at all, and instead recommend relying on the system tests. Can you comment?
.../test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
Outdated
Show resolved
Hide resolved
ACTIVE_TASKS, | ||
STANDBY_TASKS, | ||
"localhost:80", | ||
null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should... I asked a question about this on that class itself. It seems like you shouldn't have had to modify it at all.
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
Outdated
Show resolved
Hide resolved
final Set<TaskId> standbyTasks, | ||
final String userEndPoint) { | ||
final String userEndPoint, | ||
final Map<TaskId, Integer> taskLags) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this should be necessary either. IIUC, the "future" subscription info isn't supposed to really be a descendant of the current protocol, just a stand-in for some protocol version bigger than ours, in which case all that really matters is the version number. Its role is just to join the cluster and get downgraded to the "latest" version, in which case it should be able to defer to SubscriptionInfo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point
Co-Authored-By: Bruno Cadonna <[email protected]>
c0001fc
to
951c245
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ableegoldman , just a couple of final thoughts, then I think it's good to go.
}).collect(Collectors.toList())); | ||
} | ||
|
||
private static void setPrevAndStandbySetsFromParsedTaskOffsetSumMap(final SubscriptionInfoData data, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
{ | ||
"name": "taskOffsetSums", | ||
"versions": "7+", | ||
"type": "[]TaskOffsetSum" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to try for a slightly more efficient encoding here, of Map[topicGroupId -> Map[partition -> offsetSum]]
, or do you think this is fine for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was vaguely hoping that we'd add the ability to use nested structs in the near future and could move to use the TaskId
struct here, but I suppose we may as well take advantage of this to go for the more efficient encoding when possible. Will do
private static ByteBuffer encodeFutureVersion() { | ||
final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ | ||
+ 4 /* supported version */); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like maybe we have duelling code formatters. Not sure which choice makes more sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like you think the other choice makes more sense 😜 Honestly neither of them looks that good to me but I'll revert the reformatting
|
||
for (final TaskId id : tasksOnLocalStorage()) { | ||
if (isRunning(id)) { | ||
taskOffsetSums.put(id, Task.LATEST_OFFSET); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just a tiny bit uncomfortable with re-using that sentinel, because the correctness of our logic depends on the active sentinel being less than the standby sentinel, so it must be less than zero. Do we have a reason to believe that Task.LATEST_OFFSET
would never change to a number that would spoil us here, such as zero?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually changed this based on working on the next PR, as Task#changelogOffsets
uses this sentinel for exactly the same thing, ie an indicator that the task is running (and active). This is only used in computing the lag info for KIP-535, which has a similar desire to differentiate between a running task that is completely caught up and any other. So, I can't imagine this being changed -- but I can add a comment to the constant explaining it should always be negative (not sure why it's "-2" specifically, as opposed to "-1", do you?)
test this please |
Wow, the tests actually ran AND they all passed on the first try?? Amazing |
"First" PR for KIP-441: implement the protocol change so we can encode the task lag info in the subscription